Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimize SortPreservingMergeExec to avoid merging non-overlapping partitions #13296

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

suremarc
Copy link
Contributor

@suremarc suremarc commented Nov 7, 2024

Which issue does this PR close?

Closes #10316.

Rationale for this change

What changes are included in this PR?

This PR uses the existing MinMaxStatistics to reorder the input streams into chains of non-overlapping streams based on the statistics knowledge of its input. This requires some changes:

Are these changes tested?

Yes, there is a new sqllogictest, optimize_sort_preserving_merge.slt

Are there any user-facing changes?

The MinMaxStatistics API is made public, as otherwise we can't use it in the core crate where it previously was being used.

There is a new ExecutionPlan::statistics_by_partition method with a default implementation, but it is not breaking.

There are also the new Statistics::merge and ColumnStatistics::merge functions.

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Nov 7, 2024
Comment on lines -313 to -322
// First Fit:
// * Choose the first file group that a file can be placed into.
// * If it fits into no existing file groups, create a new one.
//
// By sorting files by min values and then applying first-fit bin packing,
// we can produce the smallest number of file groups such that
// files within a group are in order and non-overlapping.
//
// Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
// https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this and the relevant code into a new method, MinMaxStatistics::first_fit

Comment on lines +400 to +406
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Ok(vec![
self.statistics()?;
self.properties().partitioning.partition_count()
])
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in the PR description, this is what a proposed API would look like for statistics by partition, though it is certainly not final.

Comment on lines +104 to +107
// Helper function to get min/max statistics
let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> {
Ok(projected_statistics
.iter()
Copy link
Contributor Author

@suremarc suremarc Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this code later so it uses the projected statistics, i.e. it only relies on stats for sorting columns. I was seeing the code error because some columns had unknown statistics. Hopefully this will reduce such cases.

@alamb alamb changed the title initial attempt at implementation initial attempt at non-overlapping range implementation Nov 8, 2024
@suremarc suremarc changed the title initial attempt at non-overlapping range implementation feat: Optimize SortPreservingMergeExec to avoid merging non-overlapping partitions Nov 8, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @suremarc -- this looks like a great start to me

/// into chains, such that elements in a chain are non-overlapping and ordered
/// amongst one another.
/// This bin-packing is optimal in the sense that it has the fewest number of chains.
pub fn first_fit(&self) -> Vec<Vec<usize>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know there are no overlapping ranges here? It seems like we would also have to check if the ranges overlapped and if any did we can't do this packing

This may be checked elsewhere but I didn't see it in a cursory glance

Copy link
Contributor Author

@suremarc suremarc Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no ranges overlapped, they could all be ordered into a single chain. If some ranges do overlap, any ranges that overlap get placed into separate chains. The check for non-overlapping-ness happens in this logic:

let chain_to_insert = chains.iter_mut().find(|chain| {
// If our element is non-overlapping and comes _after_ the last element of the chain,
// it can be added to this chain.
min > self.max(
*chain
.last()
.expect("groups should be nonempty at construction"),
)
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see 👍

@github-actions github-actions bot added sqllogictest SQL Logic Tests (.slt) common Related to common crate labels Nov 11, 2024
Comment on lines +59 to +75
query TT
EXPLAIN
select a from t WHERE partition = 1
UNION all
select a from t WHERE partition = 2
ORDER BY a;
----
logical_plan
01)Sort: t.a ASC NULLS LAST
02)--Union
03)----TableScan: t projection=[a], full_filters=[t.partition = Int32(1)]
04)----TableScan: t projection=[a], full_filters=[t.partition = Int32(2)]
physical_plan
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST], partition_groups=[[2,0],[1]]
02)--UnionExec
03)----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=1/1_2.parquet]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST]
04)----ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/optimize_sort_preserving_merge/parquet_table/partition=2/2_2.parquet]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @alamb I was able to implement the statistics_by_partition for ParquetExec and UnionExec and I wrote a little test. It seems to work 🎉

In this case, files 0 (partition=1/1_1.parquet) and 2 (partition=2/2_2.parquet) are non-overlapping, but file 1 (partition=1/1_2.parquet) overlaps file 0, so it gets placed into another chain (group).

I noticed int32 columns didn't seem to have working parquet statistics, so I used a string column. Seems like we will need to plug a lot of holes to make this feature complete.

@suremarc suremarc requested a review from alamb November 11, 2024 22:15
@2010YOUY01
Copy link
Contributor

The implementation is really nice.
I'm wondering is it convenient to move the stream concat logic into StreamingMergeBuilder, like

let result = StreamingMergeBuilder::new()
    .with_streams(inputs)
    .with_statistics_by_stream(stats)
    .build(); // Concat non-overlapping input streams here

Now SortExec is implemented as 1. Sort several small runs 2. Create a internal SortPreservingMergeStream to merge all small runs. This way sort query can also benefit from this work with some additional effort

@alamb
Copy link
Contributor

alamb commented Nov 23, 2024

FYI an update here is that I don't think I am going to be able to work on Statistics for the next month or two. Though I think @mhilton from InfluxData was thinking of potentially helping (🎣 ).

@suremarc
Copy link
Contributor Author

FYI an update here is that I don't think I am going to be able to work on Statistics for the next month or two. Though I think @mhilton from InfluxData was thinking of potentially helping (🎣 ).

Ok. My team is pretty eager to get this optimization in before February-ish, so I think we may be able to spare a helper or two for the statistics-related changes. But obviously that would require someone available to review, also I think we would need a resolution in #13293 before proceeding.

@suremarc
Copy link
Contributor Author

The implementation is really nice. I'm wondering is it convenient to move the stream concat logic into StreamingMergeBuilder, like

let result = StreamingMergeBuilder::new()
    .with_streams(inputs)
    .with_statistics_by_stream(stats)
    .build(); // Concat non-overlapping input streams here

Now SortExec is implemented as 1. Sort several small runs 2. Create a internal SortPreservingMergeStream to merge all small runs. This way sort query can also benefit from this work with some additional effort

This didn't occur to me but I think it would be a great change. On the other hand I'm considering if it would make sense in a follow-on PR. But in any case there's a lot of statistics-related work that will need to be done before this PR is mergeable, unfortunately.

@alamb
Copy link
Contributor

alamb commented Nov 25, 2024

FYI an update here is that I don't think I am going to be able to work on Statistics for the next month or two. Though I think @mhilton from InfluxData was thinking of potentially helping (🎣 ).

Ok. My team is pretty eager to get this optimization in before February-ish, so I think we may be able to spare a helper or two for the statistics-related changes. But obviously that would require someone available to review, also I think we would need a resolution in #13293 before proceeding.

I will find time to review / help it along. Also, given you are willing to help I will help drive a resolution on #13293

@berkaysynnada
Copy link
Contributor

I will find time to review / help it along. Also, given you are willing to help I will help drive a resolution on #13293

We are also willing to contribute the final design of #13293, and I can help the review of this.

Copy link

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label Jan 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) Stale PR has not had any activity for some time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimized version of SortPreservingMerge that doesn't actually compare sort keys of the key ranges are ordered
4 participants